home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / Miro_Downloader.exe / dl_daemon / bittorrentdtv.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2007-11-12  |  14.1 KB  |  426 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.5)
  3.  
  4. '''Glue code to handle BitTorrent stuff.  Most of this comes from download.py
  5. in the BitTorrent library.
  6. '''
  7. from urlparse import urljoin
  8. from binascii import b2a_hex
  9. from sha import sha
  10. from os import path, makedirs
  11. from socket import error as socketerror
  12. from random import seed
  13. from threading import Thread, Event, Lock
  14. from time import time
  15. from Queue import Queue
  16.  
  17. try:
  18.     from os import getpid
  19. except ImportError:
  20.     
  21.     def getpid():
  22.         return 1
  23.  
  24.  
  25. from BitTorrent.bitfield import Bitfield
  26. from BitTorrent.btformats import check_message
  27. from BitTorrent.Choker import Choker
  28. from BitTorrent.Storage import Storage
  29. from BitTorrent.StorageWrapper import StorageWrapper
  30. from BitTorrent.Uploader import Upload
  31. from BitTorrent.Downloader import Downloader
  32. from BitTorrent.Connecter import Connecter
  33. from BitTorrent.Encrypter import Encoder
  34. from BitTorrent.RawServer import RawServer
  35. from BitTorrent.Rerequester import Rerequester
  36. from BitTorrent.DownloaderFeedback import DownloaderFeedback
  37. from BitTorrent.RateMeasure import RateMeasure
  38. from BitTorrent.CurrentRateMeasure import Measure
  39. from BitTorrent.PiecePicker import PiecePicker
  40. from BitTorrent.bencode import bencode, bdecode
  41. from BitTorrent.download import defaults
  42. from BitTorrent import version
  43. from natpunch import UPnP_test, UPnP_open_port, UPnP_close_port
  44. import util
  45. import config as dtv_config
  46. import prefs
  47. config = { }
  48. for key, default, description in defaults:
  49.     config[key] = default
  50.  
  51. config['report_hash_failures'] = True
  52. storage_lock = Lock()
  53. upnp_type = UPnP_test(1)
  54. downloader_count = util.ThreadSafeCounter()
  55.  
  56. def calc_max_upload_rate():
  57.     total_rate = int(dtv_config.get(prefs.UPSTREAM_LIMIT_IN_KBS) * 1024)
  58.     downloaders = downloader_count.getvalue()
  59.     if downloaders != 0:
  60.         return total_rate / downloaders
  61.     else:
  62.         return 0
  63.  
  64.  
  65. class TorrentDownload:
  66.     
  67.     def __init__(self, torrent_data, download_to, fast_resume_data = None):
  68.         """Create a new torrent.  torrent_data is the contents of a torrent
  69.         file/url.  download_to is the file/directory to save the torrent to.
  70.         fast_resume_data is data used to quickly restart the torrent, it's
  71.         returned by the shutdown() method.
  72.         """
  73.         self.doneflag = Event()
  74.         self.finflag = Event()
  75.         self.torrent_data = torrent_data
  76.         self.download_to = download_to
  77.         self.fast_resume_data = fast_resume_data
  78.         self.fast_resume_queue = Queue()
  79.         self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'], config['timeout'], errorfunc = self.on_error, maxconnects = config['max_allow_in'])
  80.         self.thread = None
  81.         self.current_status = { }
  82.         self.status_callback = None
  83.         
  84.         self.time_est_func = lambda : 0
  85.         self.last_up_total = self.last_down_total = 0
  86.         self.last_activity = None
  87.         self.rawserver_started = False
  88.         self.minport = dtv_config.get(prefs.BT_MIN_PORT)
  89.         self.maxport = dtv_config.get(prefs.BT_MAX_PORT)
  90.  
  91.     
  92.     def start(self):
  93.         '''Start downloading the torrent.'''
  94.         self.thread = Thread(target = self.download_thread)
  95.         filename = path.basename(self.download_to)
  96.         self.thread.setName('BitTorrent Downloader - %s' % util.stringify(filename, 'replace'))
  97.         self.thread.start()
  98.  
  99.     
  100.     def shutdown(self):
  101.         '''Stop downloading the torrent.
  102.  
  103.         Returns a string that can be used as fast resume data.
  104.         '''
  105.         self.doneflag.set()
  106.         self.rawserver.wakeup()
  107.         if self.rawserver_started:
  108.             
  109.             try:
  110.                 return self.fast_resume_queue.get(timeout = 10)
  111.             except Queue.Empty:
  112.                 return None
  113.             except:
  114.                 None<EXCEPTION MATCH>Queue.Empty
  115.             
  116.  
  117.         None<EXCEPTION MATCH>Queue.Empty
  118.         return self.fast_resume_data
  119.  
  120.     
  121.     def parse_fast_resume_data(self, total_pieces):
  122.         already_got = None
  123.         mtimes = { }
  124.         if self.fast_resume_data is not None:
  125.             
  126.             try:
  127.                 fast_resume = bdecode(self.fast_resume_data)
  128.                 already_got = fast_resume['already_got']
  129.                 mtimes = fast_resume['mtimes']
  130.             import traceback
  131.             print 'WARNING: ERROR parsing fast resume data'
  132.             traceback.print_exc(1)
  133.             self.fast_resume_data = None
  134.  
  135.         
  136.         
  137.         try:
  138.             self.pieces_already_got = Bitfield(total_pieces, already_got)
  139.         except:
  140.             print 'Failed to load resume data'
  141.             self.pieces_already_got = Bitfield(total_pieces, None)
  142.  
  143.         self.fast_resume_mtimes = mtimes
  144.  
  145.     
  146.     def skip_hash_check(self, index, files):
  147.         if not self.pieces_already_got[index]:
  148.             return False
  149.         
  150.         for f in files:
  151.             mtimes_key = f.encode('utf-8')
  152.             if path.getmtime(f) > self.fast_resume_mtimes.get(mtimes_key, 0):
  153.                 return False
  154.                 continue
  155.         
  156.         return True
  157.  
  158.     
  159.     def set_status_callback(self, func):
  160.         """Register a callback function.  func will be called whenever the
  161.         torrent download status changes and periodically while the torrent
  162.         downloads.  It will be passed a dict with the following attributes:
  163.  
  164.         activity -- string specifying what's currently happening or None for
  165.                 normal operations.  
  166.         upRate -- upload rate in B/s
  167.         downRate -- download rate in B/s
  168.         upTotal -- total MB uploaded (this run)
  169.         downTotal -- total MB downloaded (this run)
  170.         fractionDone -- what portion of the download is completed.
  171.         timeEst -- estimated completion time, in seconds.
  172.         totalSize -- total size of the torrent in bytes
  173.         """
  174.         self.status_callback = func
  175.  
  176.     
  177.     def on_error(self, message):
  178.         print 'WARNING BitTorrent error: ', message
  179.  
  180.     
  181.     def on_status(self, status_dict):
  182.         status = {
  183.             'upRate': status_dict.get('upRate', 0),
  184.             'downRate': status_dict.get('downRate', 0),
  185.             'upTotal': status_dict.get('upTotal', self.last_up_total),
  186.             'downTotal': status_dict.get('downTotal', self.last_down_total),
  187.             'timeEst': self.time_est_func(),
  188.             'totalSize': self.total_size }
  189.         if status['timeEst'] is None:
  190.             status['timeEst'] = 0
  191.         
  192.         if self.finflag.isSet():
  193.             status['fractionDone'] = 1
  194.         else:
  195.             status['fractionDone'] = status_dict.get('fractionDone', 0)
  196.         if status['downRate'] > 0 or status['upRate'] > 0:
  197.             status['activity'] = None
  198.         else:
  199.             status['activity'] = status_dict.get('activity', self.last_activity)
  200.         self.last_up_total = status['upTotal']
  201.         self.last_down_total = status['downTotal']
  202.         self.last_activity = status['activity']
  203.         self.status_callback(status)
  204.  
  205.     
  206.     def update_max_upload_rate(self):
  207.         current_rate = calc_max_upload_rate()
  208.         if current_rate != self.max_upload_rate:
  209.             self.connecter.change_max_upload_rate(current_rate)
  210.             self.max_upload_rate = current_rate
  211.         
  212.         self.rawserver.add_task(self.update_max_upload_rate, 5)
  213.  
  214.     
  215.     def filefunc(self, file, length, saveas, isdir):
  216.         self.total_size = length
  217.         return self.download_to
  218.  
  219.     
  220.     def download_thread(self):
  221.         downloader_count.inc()
  222.         
  223.         try:
  224.             self.download()
  225.         finally:
  226.             downloader_count.dec()
  227.  
  228.  
  229.     
  230.     def download(self):
  231.         spewflag = Event()
  232.         
  233.         try:
  234.             response = bdecode(self.torrent_data)
  235.             check_message(response)
  236.         except ValueError:
  237.             e = None
  238.             self.on_error('got bad file info - ' + str(e))
  239.             return None
  240.  
  241.         
  242.         try:
  243.             
  244.             def make(f, forcedir = False):
  245.                 if not forcedir:
  246.                     f = path.split(f)[0]
  247.                 
  248.                 if f != '' and not path.exists(f):
  249.                     makedirs(f)
  250.                 
  251.  
  252.             info = response['info']
  253.             if info.has_key('length'):
  254.                 file_length = info['length']
  255.                 file = self.filefunc(info['name'], file_length, config['saveas'], False)
  256.                 if file is None:
  257.                     return None
  258.                 
  259.                 make(file)
  260.                 files = [
  261.                     (file, file_length)]
  262.             else:
  263.                 file_length = 0
  264.                 for x in info['files']:
  265.                     file_length += x['length']
  266.                 
  267.                 file = self.filefunc(info['name'], file_length, config['saveas'], True)
  268.                 if file is None:
  269.                     return None
  270.                 
  271.                 make(file, True)
  272.                 files = []
  273.                 for x in info['files']:
  274.                     n = file
  275.                     for i in x['path']:
  276.                         n = path.join(n, i)
  277.                     
  278.                     files.append((n, x['length']))
  279.                     make(n)
  280.         except OSError:
  281.             e = None
  282.             self.on_error("Couldn't allocate dir - " + str(e))
  283.             return None
  284.         
  285.  
  286.         finflag = self.finflag
  287.         ann = [
  288.             None]
  289.         myid = 'M' + version.replace('.', '-')
  290.         myid = myid + '-' * (8 - len(myid)) + b2a_hex(sha(repr(time()) + ' ' + str(getpid())).digest()[-6:])
  291.         seed(myid)
  292.         pieces = [ info['pieces'][x:x + 20] for x in xrange(0, len(info['pieces']), 20) ]
  293.         self.parse_fast_resume_data(len(pieces))
  294.         
  295.         def failed(reason):
  296.             self.doneflag.set()
  297.             if reason is not None:
  298.                 self.on_error(reason)
  299.             
  300.  
  301.         rawserver = self.rawserver
  302.         storage_lock.acquire()
  303.         
  304.         try:
  305.             
  306.             try:
  307.                 storage = Storage(files, open, path.exists, path.getsize)
  308.             except IOError:
  309.                 ([],)
  310.                 e = ([],)
  311.                 []
  312.                 self.on_error('trouble accessing files - ' + str(e))
  313.                 return None
  314.             except:
  315.                 ([],)
  316.  
  317.             
  318.             def finished(finflag = finflag, ann = ann, storage = (storage,)):
  319.                 finflag.set()
  320.                 
  321.                 try:
  322.                     storage.set_readonly()
  323.                 except (IOError, OSError):
  324.                     e = None
  325.                     self.on_error('trouble setting readonly at end - ' + str(e))
  326.  
  327.                 if ann[0] is not None:
  328.                     ann[0](1)
  329.                 
  330.  
  331.             rm = [
  332.                 None]
  333.             
  334.             def data_flunked(amount, rm = rm, report_hash_failures = (config['report_hash_failures'],)):
  335.                 if rm[0] is not None:
  336.                     rm[0](amount)
  337.                 
  338.                 if report_hash_failures:
  339.                     self.on_error('a piece failed hash check, re-downloading it')
  340.                 
  341.  
  342.             storagewrapper = StorageWrapper(storage, config['download_slice_size'], pieces, info['piece length'], finished, failed, self.on_status, self.doneflag, config['check_hashes'], data_flunked, self.skip_hash_check)
  343.         except ValueError:
  344.             ([],)
  345.             e = ([],)
  346.             []
  347.             failed('bad data - ' + str(e))
  348.         except IOError:
  349.             []
  350.             e = []
  351.             failed('IOError - ' + str(e))
  352.         except:
  353.             ([],)
  354.         finally:
  355.             storage_lock.release()
  356.  
  357.         e = 'maxport less than minport - no ports to check'
  358.         for listen_port in xrange(self.minport, self.maxport + 1):
  359.             
  360.             try:
  361.                 rawserver.bind(listen_port, config['bind'])
  362.             continue
  363.             except socketerror:
  364.                 None if self.doneflag.isSet() else []
  365.                 e = None if self.doneflag.isSet() else []
  366.                 continue
  367.             
  368.  
  369.         else:
  370.             return None
  371.         choker = Choker(config['max_uploads'], rawserver.add_task, finflag.isSet, config['min_uploads'])
  372.         upmeasure = Measure(config['max_rate_period'], config['upload_rate_fudge'])
  373.         downmeasure = Measure(config['max_rate_period'])
  374.         
  375.         def make_upload(connection, choker = choker, storagewrapper = storagewrapper, max_slice_length = config['max_slice_length'], max_rate_period = config['max_rate_period'], fudge = config['upload_rate_fudge']):
  376.             return Upload(connection, choker, storagewrapper, max_slice_length, max_rate_period, fudge)
  377.  
  378.         ratemeasure = RateMeasure(storagewrapper.get_amount_left())
  379.         self.time_est_func = ratemeasure.get_time_left
  380.         rm[0] = ratemeasure.data_rejected
  381.         picker = PiecePicker(len(pieces), config['rarest_first_cutoff'])
  382.         for i in xrange(len(pieces)):
  383.             if storagewrapper.do_I_have(i):
  384.                 picker.complete(i)
  385.                 continue
  386.             None if self.doneflag.isSet() else [] if upnp_type else None if self.doneflag.isSet() else []
  387.         
  388.         downloader = Downloader(storagewrapper, picker, config['request_backlog'], config['max_rate_period'], len(pieces), downmeasure, config['snub_time'], ratemeasure.data_came_in)
  389.         self.max_upload_rate = calc_max_upload_rate()
  390.         connecter = Connecter(make_upload, downloader, choker, len(pieces), upmeasure, self.max_upload_rate, rawserver.add_task)
  391.         self.connecter = connecter
  392.         infohash = sha(bencode(info)).digest()
  393.         encoder = Encoder(connecter, rawserver, myid, config['max_message_length'], rawserver.add_task, config['keepalive_interval'], infohash, config['max_initiate'])
  394.         rerequest = Rerequester(response['announce'], config['rerequest_interval'], rawserver.add_task, connecter.how_many_connections, config['min_peers'], encoder.start_connection, rawserver.add_task, storagewrapper.get_amount_left, upmeasure.get_total, downmeasure.get_total, listen_port, config['ip'], myid, infohash, config['http_timeout'], self.on_error, config['max_initiate'], self.doneflag, upmeasure.get_rate, downmeasure.get_rate, encoder.ever_got_incoming)
  395.         if config['spew']:
  396.             spewflag.set()
  397.         
  398.         DownloaderFeedback(choker, rawserver.add_task, self.on_status, upmeasure.get_rate, downmeasure.get_rate, upmeasure.get_total, downmeasure.get_total, ratemeasure.get_time_left, ratemeasure.get_size_left, file_length, finflag, config['display_interval'], spewflag)
  399.         self.on_status({
  400.             'activity': 'connecting to peers' })
  401.         ann[0] = rerequest.announce
  402.         rerequest.begin()
  403.         self.rawserver.add_task(self.update_max_upload_rate, 5)
  404.         self.rawserver_started = True
  405.         
  406.         try:
  407.             rawserver.listen_forever(encoder)
  408.         finally:
  409.             
  410.             try:
  411.                 []['mtimes'] = []([ (f, long(path.getmtime(f))) for f, size in files ])
  412.                 fast_resume_data = dict
  413.                 self.fast_resume_queue.put(bencode(fast_resume_data))
  414.             except:
  415.                 self.fast_resume_queue.put(None)
  416.                 raise 
  417.  
  418.  
  419.         storage.close()
  420.         if upnp_active:
  421.             UPnP_close_port(listen_port)
  422.         
  423.         rerequest.announce(2)
  424.  
  425.  
  426.